Step Functions + Batchで並列処理するステートマシンを作ってみた
こんにちは。AWS事業本部のKyoです。
前回に続き、Step Functions + Batchで科学計算をイメージしつつ、できるだけシンプルなステートマシンを構築してみます。 今回は並列処理を実装するため、「Parallel」というステートを利用します。
何をやるのか
並列計算を行い、最後にそれらの計算結果を入力とした計算を行います。
詳細
前回作成したBatchのジョブ定義 Incrementを今回も利用します。
Incrementは1つの入力ファイルから1つの出力ファイルを生成します。入力ファイルには任意の数字が含まれており、そこに「1」を足したものを出力ファイルに書き込みます。入力・出力ファイルともにS3の任意のパスからダウンロード・アップロードします。
Parallelでの並列計算には2種類のブランチA, Bが存在し、任意の回数のIncrementを行います。Aでは2回、Bでは1回行う設定です。
最後に行うSumは2つの入力ファイルから与えられる数値を合計し、1つの出力ファイルを生成します。これも入力・出力ファイルともにS3の任意のパスからダウンロード・アップロードします。
準備
Sumについて説明します。
2つの入力ファイルから1つの出力ファイルを生成します。入力ファイルには任意の数字が含まれており、それらの和を出力ファイルに書き込みます。スクリプト自体はIncrementとほぼ同じで、計算のロジックとそれに関わるログやファイル取得が多少変化しています。
コンテナをビルド、ECRにプッシュし、Batchのジョブ定義も作成しておきます。
また、ステートマシンで利用する入力ファイルもS3にアップロードします。
Dockerfile
FROM amazonlinux:2 # Install AWS CLI v2 RUN yum install -y unzip less && \ curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip" && \ unzip awscliv2.zip && \ ./aws/install WORKDIR /usr/local/bin ADD sum.sh sum.sh CMD [ "sh", "sum.sh" ]
スクリプト
INPUT_FILE_NAME_1=$(basename $INPUT_S3_URI_1) INPUT_FILE_NAME_2=$(basename $INPUT_S3_URI_2) OUTPUT_FILE_NAME=$(basename $OUTPUT_S3_URI) echo "START: $STEP_NAME" # S3から入力ファイルをダウンロード aws s3 cp $INPUT_S3_URI_1 $INPUT_FILE_NAME_1 || exit 1 aws s3 cp $INPUT_S3_URI_2 $INPUT_FILE_NAME_2 || exit 1 # 入力ファイル内の数値を表示 input_num_1=`cat $INPUT_FILE_NAME_1` input_num_2=`cat $INPUT_FILE_NAME_2` echo "input1: $input_num_1, input2: $input_num_2" # 計算 output_num=$(( $input_num_1 + $input_num_2 )) echo "output: $output_num" # 計算結果を出力ファイルへ echo $output_num >$OUTPUT_FILE_NAME # S3へファイルをアップロード aws s3 cp $OUTPUT_FILE_NAME $OUTPUT_S3_URI || exit 1 echo "COMPLETE: $STEP_NAME"
ジョブ定義
- Fargate起動タイプ
- 1 vCPU, 2048 メモリ
- ジョブロールとして、「S3FullAccess」ポリシーをもったロール
ステートマシンで利用する入力ファイル
以下の2種類のファイルをS3にアップロードしておきました。パスは任意ですが、後ほど利用するのでメモしておきます。
- A_in.txt
- ファイルの中身は「2」
- B_in.txt
- ファイルの中身は「1」
やってみる
ステートマシン
今回もVSCode + AWS Toolkitを利用しています。
Parallelでのブランチはネストする形で表現されていますね。また、各ブランチの終わりにはEnd: true
が宣言されています。
Comment: >- Second state machine. StartAt: Parallel States: Parallel: Type: Parallel ResultPath: null Next: Sum Branches: - StartAt: A_1st_Increment States: A_1st_Increment: Type: Task Resource: 'arn:aws:states:::batch:submitJob.sync' ResultPath: null Parameters: JobName.$: $$.State.Name JobDefinition.$: $.params.jobdefs.increment JobQueue.$: $.params.queue ContainerOverrides: Environment: - Name: STEP_NAME Value.$: $$.State.Name - Name: INPUT_S3_URI Value.$: $.params.environment.A_INPUT_S3_URI - Name: OUTPUT_S3_URI Value.$: $.params.environment.A_INTERMEDIATE_S3_URI_1 Command: - 'sh' - 'increment.sh' Next: A_2nd_Increment A_2nd_Increment: Type: Task Resource: arn:aws:states:::batch:submitJob.sync ResultPath: null Parameters: JobName.$: $$.State.Name JobDefinition.$: $.params.jobdefs.increment JobQueue.$: $.params.queue ContainerOverrides: Environment: - Name: STEP_NAME Value.$: $$.State.Name - Name: INPUT_S3_URIA Value.$: $.params.environment.A_INTERMEDIATE_S3_URI_1 - Name: OUTPUT_S3_URI Value.$: $.params.environment.A_OUTPUT_S3_URI Command: - 'sh' - 'increment.sh' End: true - StartAt: B_1st_Increment States: B_1st_Increment: Type: Task Resource: arn:aws:states:::batch:submitJob.sync ResultPath: null Parameters: JobName.$: $$.State.Name JobDefinition.$: $.params.jobdefs.increment JobQueue.$: $.params.queue ContainerOverrides: Environment: - Name: STEP_NAME Value.$: $$.State.Name - Name: INPUT_S3_URI Value.$: $.params.environment.B_INPUT_S3_URI - Name: OUTPUT_S3_URI Value.$: $.params.environment.B_OUTPUT_S3_URI Command: - 'sh' - 'increment.sh' End: true Sum: Type: Task Resource: arn:aws:states:::batch:submitJob.sync ResultPath: null Parameters: JobName.$: $$.State.Name JobDefinition.$: $.params.jobdefs.sum JobQueue.$: $.params.queue ContainerOverrides: Environment: - Name: STEP_NAME Value.$: $$.State.Name - Name: INPUT_S3_URI_1 Value.$: $.params.environment.A_OUTPUT_S3_URI - Name: INPUT_S3_URI_2 Value.$: $.params.environment.B_OUTPUT_S3_URI - Name: OUTPUT_S3_URI Value.$: $.params.environment.SUM_OUTPUT_S3_URI Command: - 'sh' - 'sum.sh' End: true
入力
以下のようなJSONを入力とします(値はダミーです)。params
で 利用するキューとジョブ定義を、environment
で各ブランチの入出力を定義しています。A_INPUT_S3_URI
およびB_INPUT_S3_URI
は先程アップロードした入力ファイルのURIです。A_INTERMEDIATE_S3_URI_1
はブランチAの計算途中(A_1st_Increment)の結果をアップロードするURIです。
{ "params": { "queue": "arn:aws:batch:ap-northeast-1:123456789012:job-queue/fargate-batch-queu", "jobdefs": { "increment": "arn:aws:batch:ap-northeast-1:123456789012:job-definition/increment-job-def:1", "sum": "arn:aws:batch:ap-northeast-1:123456789012:job-definition/sum-job-def:1" }, "environment": { "A_INPUT_S3_URI": "s3://my-sfs-bucket-123456789012/input/A/a_in.txt", "A_INTERMEDIATE_S3_URI_1": "s3://my-sfs-bucket-123456789012/output/A/a_intermediate_1.txt", "A_OUTPUT_S3_URI": "s3://my-sfs-bucket-123456789012/output/A/a_out.txt", "B_INPUT_S3_URI": "s3://my-sfs-bucket-123456789012/input/B/b_in.txt", "B_OUTPUT_S3_URI": "s3://my-sfs-bucket-123456789012/output/B/b_out.txt", "SUM_OUTPUT_S3_URI": "s3://my-sfs-bucket-123456789012/output/Sum/sum_out.txt" } } }
結果
無事にステートマシンの実行が終わりました。
A, B両ブランチの計算結果であるA_OUTPUT_S3_URI
, B_OUTPUT_S3_URI
を見てみるとそれぞれ、「4」, 「2」でした。さらにSUMの結果であるSUM_OUTPUT_S3_URI
は「6」 でした。A, B両ブランチの和になっているので、想定どおりの動きですね。
少しハマったところ
以下のエラーメッセージに遭遇しました(上記コードでは対応済み)。
The JSONPath '$.params.jobdefs.sum' specified for the field 'JobDefinition.$' could not be found in the input
原因はParalleの出力が配列であることでした。今回はブランチが2つだったので、要素が2つのJSON配列です。
[ { "params": { "queue": "arn:aws:batch:ap-northeast-1:123456789012:job-queue/fargate-batch-queu", "jobdefs": { "increment": "arn:aws:batch:ap-northeast-1:123456789012:job-definition/my-calc-job-def:1", "sum": "arn:aws:batch:ap-northeast-1:123456789012:job-definition/sum-job-def:1" }, "environment": { "A_INPUT_S3_URI": "s3://my-sfs-bucket-123456789012/input/A/a_in.txt", "A_INTERMEDIATE_S3_URI_1": "s3://my-sfs-bucket-123456789012/output/A/a_intermediate_1.txt", "A_OUTPUT_S3_URI": "s3://my-sfs-bucket-123456789012/output/A/a_out.txt", "B_INPUT_S3_URI": "s3://my-sfs-bucket-123456789012/input/B/b_in.txt", "B_OUTPUT_S3_URI": "s3://my-sfs-bucket-123456789012/output/B/b_out.txt", "SUM_OUTPUT_S3_URI": "s3://my-sfs-bucket-123456789012/output/Sum/sum_out.txt" } } }, { "params": { "queue": "arn:aws:batch:ap-northeast-1:123456789012:job-queue/fargate-batch-queu", "jobdefs": { "increment": "arn:aws:batch:ap-northeast-1:123456789012:job-definition/my-calc-job-def:1", "sum": "arn:aws:batch:ap-northeast-1:123456789012:job-definition/sum-job-def:1" }, "environment": { "A_INPUT_S3_URI": "s3://my-sfs-bucket-123456789012/input/A/a_in.txt", "A_INTERMEDIATE_S3_URI_1": "s3://my-sfs-bucket-123456789012/output/A/a_intermediate_1.txt", "A_OUTPUT_S3_URI": "s3://my-sfs-bucket-123456789012/output/A/a_out.txt", "B_INPUT_S3_URI": "s3://my-sfs-bucket-123456789012/input/B/b_in.txt", "B_OUTPUT_S3_URI": "s3://my-sfs-bucket-123456789012/output/B/b_out.txt", "SUM_OUTPUT_S3_URI": "s3://my-sfs-bucket-123456789012/output/Sum/sum_out.txt" } } } ]
対応
2種類の対応があります。
- Paralleで
ResultPath: null
を指定する - JSONPath式を利用する
1.はParalleの出力を利用しないという設定です。ブランチの中ではなく、Paralleに設定する必要があることに注意してください。 2.はParalleの出力から必要な部分を抜いてくるイメージです。
1つめの要素からSumのジョブ定義Arnを取得するなら、以下のようになります。
$.[0].params.jobdefs.sum
JSONPathの操作はKinesisのドキュメントにJSONPathの操作の操作がまとまっていたので参考になりました。また、StepFunctionsのデータフローシミュレーターを使って、実際の入力に対してどのようなJSONPath式で欲しいデータが取得できるのか試すこともできます。
今回は後続の処理がなかったので、1. のParalleでResultPath: null
で対応しています。
おわりに
今回はStep Functions + Batchを利用して並列処理および、その結果をまとめる処理を実装してみました。
本エントリではParallelを1つ利用しましたが、複数利用することでより複雑なステートマシンも組めるようになります。
以上、なにかのお役に立てれば幸いです。